-
Notifications
You must be signed in to change notification settings - Fork 1.8k
out_kafka: support AWS MSK IAM authentication #11224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
out_kafka: support AWS MSK IAM authentication #11224
Conversation
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
- Switch provider to sync mode before initialization to prevent hanging - Initialize provider with sync mode (required before event loop is available) - Switch back to async mode after successful initialization - Follows pattern used by other AWS credential providers This fixes potential credential initialization failures in IRSA/EKS deployments where HTTP requests during init would hang without the event loop. Signed-off-by: Arbin <[email protected]>
- Add force refresh logic to EC2, STS, and EKS credential providers - Set next_refresh to 0 in refresh functions to ensure immediate credential update - Fixes MSK IAM authentication failures after ~1 hour due to stale credentials - Aligns with AWS SDK behavior where refresh() means force refresh This resolves the issue where OAuth token refresh (every ~15 minutes) would not actually refresh AWS credentials until next_refresh time was reached (typically 1 hour later), causing MSK connection failures with 'Access denied' errors. The fix ensures that every OAuth callback will fetch fresh credentials from AWS, matching the behavior of official AWS SDKs (Python, Java). Signed-off-by: Arbin <[email protected]>
…uild fails Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
…entials Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughReplaced MSK cluster-ARN model with broker-derived region/serverless detection; introduced TLS-backed provider lifecycle and synchronized credential refresh in MSK IAM; updated OAuth callback API to accept brokers/opaque; plugins register MSK IAM only when broker hostnames match AWS MSK patterns and when using OAUTHBEARER; removed legacy config keys. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Plugin as Kafka Plugin (in/out)
participant rdkafka as librdkafka
participant OauthCB as flb_aws_msk_iam
participant AWSProv as AWS Provider (IMDS/STS/Profile)
participant TLS as TLS layer
Plugin->>rdkafka: set sasl.mechanism (detect aws_msk_iam → OAUTHBEARER)
Plugin->>rdkafka: enable sasl.queue / background callbacks
Plugin->>OauthCB: register oauthbearer callback (opaque: plugin ctx, brokers)
Note right of OauthCB: derive region & serverless from broker hostnames
OauthCB->>TLS: initialize TLS context (once at register)
OauthCB->>AWSProv: init/reuse provider (with TLS)
OauthCB->>OauthCB: lock mutex
OauthCB->>AWSProv: refresh/get credentials (synchronized)
AWSProv-->>OauthCB: return temporary credentials
OauthCB->>OauthCB: build presigned payload (host, region, creds)
OauthCB-->>rdkafka: deliver oauthbearer token to librdkafka
rdkafka->>Plugin: authenticate to MSK broker using token
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
src/aws/flb_aws_msk_iam.c (1)
664-669: Redundant null check.The check at line 666 (
if (!ctx->region)) is redundant sinceregion_strwas already validated as non-null at lines 651-659 before assignment toctx->regionat line 664. While harmless, it could be removed for clarity.ctx->region = region_str; - if (!ctx->region) { - flb_free(ctx); - return NULL; - } - /* Create TLS instance */plugins/in_kafka/in_kafka.c (1)
344-370: Consider making OAuth callback registration failure fatal.If
flb_aws_msk_iam_register_oauth_cbfails (line 357), the code logs an error but continues initialization. Since MSK IAM is explicitly configured, proceeding without proper authentication will result in connection failures.if (!ctx->msk_iam) { flb_plg_error(ins, "failed to setup MSK IAM authentication"); + goto init_error; } else {plugins/out_kafka/kafka_config.c (1)
206-239: Same concern as in_kafka: OAuth callback registration failure should be fatal.Similar to the input plugin, if MSK IAM setup fails at line 225, the code logs but continues. This will lead to connection failures at runtime.
Additionally,
rd_kafka_conf_enable_sasl_queueat line 218 is correctly called before OAuth callback registration to enable background thread handling.if (!ctx->msk_iam) { flb_plg_error(ctx->ins, "failed to setup MSK IAM authentication"); + flb_out_kafka_destroy(ctx); + return NULL; } else {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
include/fluent-bit/aws/flb_aws_msk_iam.h(0 hunks)plugins/in_kafka/in_kafka.c(2 hunks)plugins/in_kafka/in_kafka.h(0 hunks)plugins/out_kafka/kafka.c(0 hunks)plugins/out_kafka/kafka_config.c(3 hunks)plugins/out_kafka/kafka_config.h(0 hunks)src/aws/flb_aws_credentials_ec2.c(1 hunks)src/aws/flb_aws_credentials_profile.c(1 hunks)src/aws/flb_aws_credentials_sts.c(2 hunks)src/aws/flb_aws_msk_iam.c(11 hunks)
💤 Files with no reviewable changes (4)
- include/fluent-bit/aws/flb_aws_msk_iam.h
- plugins/out_kafka/kafka.c
- plugins/out_kafka/kafka_config.h
- plugins/in_kafka/in_kafka.h
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (2)
src/aws/flb_aws_msk_iam.c (6)
src/flb_sds.c (4)
flb_sds_create_len(58-76)flb_sds_create_size(92-95)flb_sds_printf(336-387)flb_sds_destroy(389-399)src/aws/flb_aws_credentials.c (2)
flb_aws_credentials_destroy(752-767)flb_standard_chain_provider_create(269-327)include/fluent-bit/flb_mem.h (2)
flb_calloc(84-96)flb_free(126-128)src/tls/flb_tls.c (2)
flb_tls_create(183-232)flb_tls_destroy(258-277)src/aws/flb_aws_util.c (1)
flb_aws_client_generator(280-283)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)
plugins/in_kafka/in_kafka.c (3)
src/flb_input.c (2)
flb_input_get_property(776-780)flb_input_set_property(557-774)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(590-716)
🔇 Additional comments (12)
src/aws/flb_aws_credentials_sts.c (1)
478-489:src/aws/flb_aws_credentials_profile.c (1)
664-673: LGTM - Appropriate log level adjustment for missing credentials file.Downgrading ENOENT from a conditional error/debug to always debug is correct since a missing shared credentials file is a normal condition when using other authentication methods (environment variables, instance profiles, etc.).
src/aws/flb_aws_credentials_ec2.c (1)
132-133: No concerns - formatting change only.src/aws/flb_aws_msk_iam.c (5)
41-53: LGTM - Token lifetime and struct changes are appropriate.The 5-minute token lifetime aligns with AWS SDK conventions, and the struct changes properly reflect the new broker-derived region approach.
135-188: Region extraction logic looks correct.The function properly handles both MSK Standard (
*.kafka.<region>.amazonaws.com) and MSK Serverless (*.kafka-serverless.<region>.amazonaws.com) broker formats. The backwards scan from.amazonaws.comcorrectly locates the region segment.
190-488: Payload generation logic is correctly implemented.The SigV4 signing, canonical request construction, and base64 URL encoding follow AWS specifications. Resource cleanup in both success and error paths is thorough.
490-587: OAuth callback implementation is well-structured.The callback properly:
- Refreshes credentials before retrieval (lines 532-536)
- Destroys credentials after use (lines 550, 573)
- Handles all error paths with appropriate
rd_kafka_oauthbearer_set_token_failurecalls- Uses a fixed 5-minute lifetime which librdkafka will refresh at ~80% (4 minutes)
718-737: LGTM - Proper resource cleanup.The destroy function correctly cleans up all allocated resources (provider, TLS, region) with appropriate null checks.
plugins/in_kafka/in_kafka.c (2)
271-294: LGTM - SASL mechanism conversion logic is correct.The automatic conversion from
aws_msk_iamtoOAUTHBEARERand defaultingsecurity.protocoltoSASL_SSLprovides the expected simplified configuration experience.
336-342: LGTM - Opaque context correctly includes Kafka context.Passing
&ctx->kafkatoflb_kafka_opaque_setenables the MSK IAM module to access broker configuration for region extraction.plugins/out_kafka/kafka_config.c (2)
61-84: LGTM - Consistent SASL mechanism handling with in_kafka.The conversion logic mirrors the input plugin implementation, ensuring consistent behavior across both Kafka plugins.
251-270: LGTM - SASL background callbacks handling is appropriate.Enabling background callbacks ensures OAuth tokens refresh even on idle connections. Making failure non-fatal is reasonable since token refresh will still work through
rd_kafka_poll, though with less reliability on idle connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
0fcfb97 to
79a7e60
Compare
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
Signed-off-by: Arbin <[email protected]>
35a5e62 to
491287a
Compare
a59b51a to
776e6c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
plugins/in_kafka/in_kafka.c (1)
271-297: aws_msk_iam is now explicit opt‑in and mapped to OAUTHBEARERThe init logic correctly:
- Reads
rdkafka.sasl.mechanism.- Treats
aws_msk_iamas an alias, settingctx->aws_msk_iam = FLB_TRUE.- Converts the effective mechanism to
OAUTHBEARER(for librdkafka) and defaultsrdkafka.security.protocoltoSASL_SSLwhen unset.This keeps MSK IAM behind an explicit opt‑in flag while exposing the standard OAUTHBEARER mechanism to librdkafka, which is the right direction.
One caveat: using
flb_input_set_property()to overwriterdkafka.sasl.mechanismwill leave the originalaws_msk_iamentry inins->properties. That meansflb_kafka_conf_create()may see twordkafka.sasl.mechanismentries, likely causing an “invalid value” log for theaws_msk_iamentry even though the configuration is valid.Consider updating the existing KV in
ins->properties(or overriding viard_kafka_conf_set()afterflb_kafka_conf_create()) instead of appending a new entry, to avoid confusing error logs.plugins/out_kafka/kafka_config.c (1)
61-87: aws_msk_iam aliasing and TLS defaults look good; watch for duplicate property noiseThe output plugin mirrors the input side:
- Reads
rdkafka.sasl.mechanism.- Treats
aws_msk_iamas an alias, settingctx->aws_msk_iam = FLB_TRUE.- Sets the effective mechanism to
OAUTHBEARERand defaultsrdkafka.security.protocoltoSASL_SSLwhen unset.- Stores the effective mechanism in
ctx->sasl_mechanismfor later checks.As with the input plugin, using
flb_output_set_property()here means both the originalaws_msk_iamand the rewrittenOAUTHBEARERentries will exist inins->properties, andflb_kafka_conf_create()will see both. This can generate a spurious “invalid value” log when librdkafka rejectsaws_msk_iamas a SASL mechanism.If you want to avoid that noise, consider:
- Updating the existing KV for
rdkafka.sasl.mechanismin-place, or- Overriding the final value via
rd_kafka_conf_set(ctx->conf, "sasl.mechanism", "OAUTHBEARER", ...)afterflb_kafka_conf_create()instead of adding a second property.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
include/fluent-bit/aws/flb_aws_msk_iam.h(1 hunks)plugins/in_kafka/in_kafka.c(4 hunks)plugins/in_kafka/in_kafka.h(1 hunks)plugins/out_kafka/kafka.c(0 hunks)plugins/out_kafka/kafka_config.c(4 hunks)plugins/out_kafka/kafka_config.h(1 hunks)src/aws/flb_aws_credentials_ec2.c(1 hunks)src/aws/flb_aws_credentials_profile.c(1 hunks)src/aws/flb_aws_credentials_sts.c(2 hunks)src/aws/flb_aws_msk_iam.c(12 hunks)src/flb_kafka.c(1 hunks)
💤 Files with no reviewable changes (1)
- plugins/out_kafka/kafka.c
🚧 Files skipped from review as they are similar to previous changes (3)
- plugins/out_kafka/kafka_config.h
- include/fluent-bit/aws/flb_aws_msk_iam.h
- plugins/in_kafka/in_kafka.h
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.csrc/flb_kafka.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (2)
plugins/in_kafka/in_kafka.c (4)
src/flb_input.c (2)
flb_input_get_property(776-780)flb_input_set_property(557-774)src/flb_sds.c (2)
flb_sds_create(78-90)flb_sds_destroy(389-399)src/aws/flb_aws_msk_iam.c (2)
flb_aws_msk_iam_register_oauth_cb(628-761)flb_aws_msk_iam_destroy(764-786)src/flb_kafka.c (1)
flb_kafka_opaque_destroy(233-240)
plugins/out_kafka/kafka_config.c (3)
src/flb_output.c (2)
flb_output_get_property(1108-1111)flb_output_set_property(843-1068)src/flb_sds.c (2)
flb_sds_create(78-90)flb_sds_destroy(389-399)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(628-761)
🪛 Cppcheck (2.18.0)
src/aws/flb_aws_msk_iam.c
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 49-49: Include file
(missingIncludeSystem)
[information] 50-50: Include file
(missingIncludeSystem)
[information] 51-51: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] 41-41: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 137-137: Include file
(missingIncludeSystem)
[information] 138-138: Include file
(missingIncludeSystem)
[information] 139-139: Include file
(missingIncludeSystem)
[information] 140-140: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_kafka/in_kafka.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/out_kafka/kafka_config.c
[information] 61-61: Include file
(missingIncludeSystem)
[information] 62-62: Include file
(missingIncludeSystem)
[information] 63-63: Include file
(missingIncludeSystem)
[information] 65-65: Include file
(missingIncludeSystem)
[information] 66-66: Include file
(missingIncludeSystem)
[information] 67-67: Include file
(missingIncludeSystem)
[information] 68-68: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_ec2.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_profile.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_sts.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/flb_kafka.c
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
🔇 Additional comments (16)
src/aws/flb_aws_credentials_sts.c (1)
173-186: Formatting-only change; behavior unchangedThe added blank lines after the debug logs in
refresh_fn_stsandrefresh_fn_eksare cosmetic and do not affect control flow or error handling. No further changes needed.Also applies to: 478-489
src/flb_kafka.c (1)
96-100: Correctly destroyrd_kafka_conf_ton errorSwitching from
flb_free(kafka_cfg)tord_kafka_conf_destroy(kafka_cfg)in the error path aligns with howrd_kafka_conf_tis created and should be freed. This avoids undefined behavior and potential leaks whenrd_kafka_conf_new()succeeds butflb_kafka_conf_create()fails later.Please double-check against the librdkafka
rd_kafka_conf_new/rd_kafka_conf_destroydocs thatrd_kafka_conf_destroy()is the expected way to free this object in your target version.src/aws/flb_aws_credentials_ec2.c (1)
128-139: Whitespace-only changeThe extra blank line after the EC2 IMDS provider refresh debug log is stylistic only and has no runtime impact.
src/aws/flb_aws_credentials_profile.c (1)
646-674: More appropriate log level for missing shared credentials fileWhen the shared credentials file is absent (
errno == ENOENT), logging viaAWS_CREDS_DEBUGinstead ofAWS_CREDS_ERROR_OR_DEBUGavoids spurious error-level noise while still returning an error to the caller. This matches how the shared config path already treats ENOENT and looks good.plugins/in_kafka/in_kafka.c (3)
347-357: SASL queue + background callbacks for OAUTHBEARER are wired correctlyEnabling:
rd_kafka_conf_enable_sasl_queue(kafka_conf, 1)at config time, andrd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk)afterrd_kafka_new()for all OAUTHBEARER uses (not just MSK IAM) is a solid way to ensure OAuth token refresh on idle connections or long poll intervals. Logging both the enable and failure cases is also helpful.
No issues from a lifecycle perspective: both calls are made before producing/consuming, and failures degrade gracefully to the previous behavior.
Please confirm your minimum supported librdkafka version exposes
rd_kafka_conf_enable_sasl_queue()andrd_kafka_sasl_background_callbacks_enable(); otherwise, this will need version guards.Also applies to: 412-424
358-389: MSK IAM registration is now correctly gated on explicit opt‑in and AWS brokersThe new registration block only activates MSK IAM when:
ctx->aws_msk_iamis set (i.e., user explicitly configuredrdkafka.sasl.mechanism=aws_msk_iam), and- the effective mechanism is
OAUTHBEARER, and- brokers look like AWS MSK endpoints (
.kafka.or.kafka-serverless.plus.amazonaws.com).Passing the
brokersstring intoflb_aws_msk_iam_register_oauth_cb()and cleaning upctx->msk_iamin theinit_errorpath closes the earlier resource-leak and “forced MSK IAM on generic OAUTHBEARER” issues. This is a good, minimal, and safe gating.
391-403: Correct handling ofkafka_confownership and MSK IAM cleanup on init failureTwo important fixes here:
kafka_confis left non-NULL whenrd_kafka_new()fails and only nulled on success. Theinit_errorhandler now:
- Destroys the consumer (
rd_kafka_destroy) when created, or- Calls
rd_kafka_conf_destroy(kafka_conf)when creation failed.
ctx->msk_iamis now destroyed in theinit_errorpath (underFLB_HAVE_AWS_MSK_IAM), mirroringin_kafka_exit()and preventing leaks of TLS/aws-provider resources when init fails after successful MSK IAM setup.This aligns with librdkafka’s ownership rules and fixes the reported leak.
Given this relies on librdkafka semantics (conf is only consumed on successful
rd_kafka_new()), please verify against the version you build with.Also applies to: 486-507
plugins/out_kafka/kafka_config.c (4)
197-208: Opaque usage and SASL queue enablement for OAUTHBEARER are correct
ctx->opaqueis initialized and wired into the conf;flb_kafka_opaque_set(ctx->opaque, ctx, NULL)keepsptrpointing to the plugin context while leavingmsk_iam_ctxfree for MSK IAM use.- Enabling the SASL queue for any
ctx->sasl_mechanism == "OAUTHBEARER"ensures librdkafka’s OAuth token refresh machinery can run even when the producer is idle.This matches the input plugin behavior and is consistent with the new MSK IAM OAuth callback flow.
Also applies to: 209-218
220-250: MSK IAM registration is correctly scoped and cleans up on failureThe producer side only registers MSK IAM when:
- The user explicitly requested it (
ctx->aws_msk_iam), and- The effective mechanism is
OAUTHBEARER, and- Brokers look like AWS MSK endpoints (
.kafka.or.kafka-serverless.plus.amazonaws.com).If
flb_aws_msk_iam_register_oauth_cb()fails, callingflb_out_kafka_destroy(ctx)and returning NULL cleanly tears down all partially-initialized resources (conf, opaque, topics, etc.). The follow-upsasl.oauthbearer.configsetting is also correctly guarded and failure is logged but non-fatal.This resolves the earlier concern about silently enabling MSK IAM for generic OAUTHBEARER configs.
252-267: rd_kafka_conf ownership and destroy paths are now correctThe updated logic around
rd_kafka_new()and teardown:
- Leaves
ctx->confintact whenrd_kafka_new()fails soflb_out_kafka_destroy()can callrd_kafka_conf_destroy(ctx->conf).- Sets
ctx->conf = NULLonly on success, relying onrd_kafka_destroy(ctx->kafka.rk)to free the configuration later.- In
flb_out_kafka_destroy(), destroysctx->kafka.rkwhen present, otherwise destroysctx->conf.This matches librdkafka’s contract and closes the memory-leak scenario previously flagged in the producer path.
Please validate this against your librdkafka version’s
rd_kafka_new()docs to ensure the “conf consumed only on success” assumption still holds.Also applies to: 332-351
268-285: Background callbacks and MSK IAM teardownEnabling
rd_kafka_sasl_background_callbacks_enable()for OAUTHBEARER and checking its return value provides robust behavior:
- On success, tokens are refreshed even on idle producers.
- On failure, you emit a clear warning but keep the producer usable.
The MSK IAM context (
ctx->msk_iam) is destroyed inflb_out_kafka_destroy(), afterrd_kafka_destroy(), which ensures the OAuth callback will not be invoked again after the context is freed. This is a sensible ordering.Because this depends on librdkafka background callback semantics, please confirm that
rd_kafka_destroy()guarantees no further OAuth callbacks after it returns in your target version.Also applies to: 363-369
src/aws/flb_aws_msk_iam.c (5)
137-212: Region extraction from broker covers standard, serverless, and VPC endpoint forms
extract_region_from_broker():
- Strips any
:portsuffix.- Locates
.amazonaws.com, with a special case for.vpce.amazonaws.com.- Walks back to the previous
.to isolate the region between that dot and.amazonaws.com(or.vpcefor VPC endpoints).- Handles:
- Standard MSK:
...kafka.<region>.amazonaws.com- Serverless:
...kafka-serverless.<region>.amazonaws.com- VPC:
...kafka.<region>.vpce.amazonaws.com- Partitions where the host continues past
amazonaws.com(e.g.amazonaws.com.cn), since the substring match is against.amazonaws.com.Length checks and NULL returns on malformed inputs look sane, and error paths are clean. This is a reasonable heuristic for AWS MSK brokers and aligns with how plugins gate MSK IAM usage based on
.kafka[‑serverless].*amazonaws.com.
215-512: SigV4-style payload builder is consistent and cleans up correctly
build_msk_iam_payload()now:
- Validates presence of
config->region,host, and credentials.- Constructs the credential string
AKIA/yyyymmdd/region/kafka-cluster/aws4_request.- Builds the canonical query (Action, X-Amz-Algorithm, Credential, Date, Expires, optional Security-Token, SignedHeaders=host).
- Computes:
- Empty payload hash.
- Canonical request over
GET\n/\n<query>\nhost:<host>\n\nhost\n<hash>.- String to sign and signing key using the AWS standard
AWS4HMAC chain with servicekafka-cluster.- Hex signature and appends
X-Amz-Signature.- Constructs the presigned URL and then Base64-encodes it, converting to URL-safe Base64 and trimming padding.
The error label frees every intermediate
flb_sds_t, the success path also destroys all temporaries, andpayloadis the only returned SDS. No obvious leaks or double-frees, and the logic matches AWS SigV4 for query-signing MSK IAM URLs.Because correctness here is tied to AWS MSK IAM’s SigV4 expectations, please validate this flow (service name
kafka-cluster, query parameters, and signing steps) against the latest AWS MSK IAM documentation or reference implementation.
514-625: OAuth refresh callback: concurrency, provider usage, and token lifetimeThe new
oauthbearer_token_refresh_cb:
- Verifies the opaque context and region.
- Derives the MSK host (
kafka-serverless.<region>.amazonaws.comvskafka.<region>.amazonaws.com) fromconfig->is_serverless.- Uses a dedicated
pthread_mutex_t lockto serialize access to the AWS provider:
- Locks
config->lock.- Calls
provider_vtable->refresh()andget_credentials().- Unlocks before building the payload, minimizing contention.
- Builds the MSK IAM payload with the just-fetched credentials.
- Sets an OAuth token lifetime of
now + MSK_IAM_TOKEN_LIFETIME_SECONDS(5 minutes) and delegates refresh cadence to librdkafka’s background callbacks.- Destroys the temporary credentials and payload in all paths.
This addresses prior race conditions between librdkafka’s callback thread and other provider users, and the fixed 5‑minute lifetime aligns with common AWS MSK IAM practice.
Please confirm:
- That your configured AWS provider chain is safe to call from this background thread, and
- That librdkafka’s
rd_kafka_oauthbearer_set_token()semantics (absolute ms since epoch) match howmd_lifetime_msis being calculated, in your target version.
627-761: Broker-driven registration with TLS-backed provider and mutex is sound
flb_aws_msk_iam_register_oauth_cb()now:
- Requires:
- Non-NULL
opaque, and- Non-empty
brokersstring.- Extracts the first broker from the CSV list and determines:
- Region via
extract_region_from_broker().- Whether it’s serverless via
.kafka-serverless.substring.- Allocates
struct flb_aws_msk_iam, storesconfig,region,is_serverless, and logs what it detected.- Creates a TLS context (
ctx->cred_tls) dedicated to credential fetching.- Builds a standard AWS provider chain (
flb_standard_chain_provider_create) for the extracted region, callssync+init, then switches to async.- Initializes
ctx->lockto guard provider access from other threads.- Stores
ctxinopaque->msk_iam_ctxwhile leavingopaque->ptruntouched, and wires the OAuth refresh callback viard_kafka_conf_set_oauthbearer_token_refresh_cb().All error paths:
- Free
first_brokerandregion_stras appropriate.- Destroy TLS and provider when partially initialized.
- Avoid leaving a half-initialized context behind.
The resulting design is broker-driven (no cluster ARN), supports Standard and Serverless, and encapsulates TLS and provider lifecycles cleanly.
Given this depends on
flb_standard_chain_provider_create()andflb_tls_create()semantics, please verify that:
- The chosen TLS settings are appropriate for IMDS/STS endpoints in your environment, and
- The provider chain you construct here is the same one you rely on elsewhere for AWS integrations.
763-785: Destroy path fully matches new allocations
flb_aws_msk_iam_destroy()now:
- Destroys
ctx->provider.- Destroys the TLS context
ctx->cred_tls.- Frees
ctx->region.- Destroys the mutex.
- Frees the struct itself.
This mirrors all allocations made in
flb_aws_msk_iam_register_oauth_cb()and avoids leaks or dangling state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
plugins/in_exec_wasi/in_exec_wasi.c (1)
456-459: Consider backward‑compatible alias for renamedoneshotconfig keyRenaming the config key from the previous name (per summary:
"bool") to"oneshot"will break existing configurations that still use the old key. If this plugin is already in use, consider keeping a deprecated alias entry pointing at the same field to avoid surprise breakage, e.g.:@@ static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_BOOL, "oneshot", "false", - 0, FLB_TRUE, offsetof(struct flb_exec_wasi, oneshot), - "execute the command only once" - }, + { + FLB_CONFIG_MAP_BOOL, "oneshot", "false", + 0, FLB_TRUE, offsetof(struct flb_exec_wasi, oneshot), + "execute the command only once" + }, + /* Backward‑compat alias (deprecated) */ + { + FLB_CONFIG_MAP_BOOL, "bool", "false", + 0, FLB_TRUE, offsetof(struct flb_exec_wasi, oneshot), + "[deprecated] use 'oneshot'; execute the command only once" + },If you intentionally want a breaking change here, at least ensure the docs call out the rename explicitly.
.github/scripts/commit_prefix_check.py (1)
176-193: Verify umbrella prefix coverage for your workflow.The
umbrella_prefixesset only includes"lib:". Consider whether"tests:"should also be an umbrella prefix to allow commits that modify multiple test files without requiring split commits.Currently, a commit touching
tests/unit/test_a.candtests/runtime/test_b.cwould infer{"unit:", "runtime:"}as non-build prefixes (if those are subdirectories), potentially requiring a split. If that's intentional, this is fine; otherwise, you may want to expand umbrella handling.plugins/in_kafka/in_kafka.c (1)
359-388: Consider surfacing a clear error when aws_msk_iam is requested for non‑MSK brokersThe new gating (
ctx->aws_msk_iamflag + AWS broker pattern) correctly prevents MSK IAM from auto‑enabling on generic OAUTHBEARER setups. One remaining corner case is when users configurerdkafka.sasl.mechanism=aws_msk_iambut pointbrokersat a non‑AWS cluster: IAM registration is skipped and the consumer ends up as plain OAUTHBEARER without any token provider. Emitting a clear log error (and possibly failing init) in that mismatch case would make misconfiguration much easier to diagnose.plugins/out_kafka/kafka_config.c (1)
209-218: Good MSK IAM gating; consider explicit error on aws_msk_iam+non‑MSK brokersEnabling the SASL queue and background callbacks for all OAUTHBEARER configs and only registering MSK IAM when both
aws_msk_iamis set and brokers match AWS MSK host patterns is the right balance between safety and convenience. As with the input plugin, a small improvement would be to log a clear error (or fail init) whenaws_msk_iamis requested but the brokers string does not look like an MSK endpoint, to avoid confusing OAUTHBEARER failures.Also applies to: 220-250
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
.github/scripts/commit_prefix_check.py(4 hunks).github/scripts/tests/test_commit_lint.py(4 hunks).github/workflows/commit-lint.yaml(2 hunks)CMakeLists.txt(1 hunks)dockerfiles/Dockerfile(11 hunks)include/fluent-bit/aws/flb_aws_msk_iam.h(1 hunks)plugins/in_elasticsearch/in_elasticsearch.c(1 hunks)plugins/in_exec_wasi/in_exec_wasi.c(1 hunks)plugins/in_forward/fw.c(3 hunks)plugins/in_forward/fw_prot.c(2 hunks)plugins/in_kafka/in_kafka.c(4 hunks)plugins/in_kafka/in_kafka.h(1 hunks)plugins/in_node_exporter_metrics/ne_utils.c(1 hunks)plugins/out_kafka/kafka.c(0 hunks)plugins/out_kafka/kafka_config.c(4 hunks)plugins/out_kafka/kafka_config.h(1 hunks)src/aws/flb_aws_credentials_ec2.c(1 hunks)src/aws/flb_aws_credentials_sts.c(2 hunks)src/aws/flb_aws_msk_iam.c(12 hunks)src/flb_kafka.c(1 hunks)
💤 Files with no reviewable changes (1)
- plugins/out_kafka/kafka.c
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
CMakeLists.txtdockerfiles/Dockerfilesrc/aws/flb_aws_msk_iam.c
📚 Learning: 2025-09-08T11:21:33.975Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 10851
File: include/fluent-bit/flb_simd.h:60-66
Timestamp: 2025-09-08T11:21:33.975Z
Learning: Fluent Bit currently only supports MSVC compiler on Windows, so additional compiler compatibility guards may be unnecessary for Windows-specific code paths.
Applied to files:
CMakeLists.txt
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
dockerfiles/Dockerfilesrc/aws/flb_aws_msk_iam.csrc/flb_kafka.c
📚 Learning: 2025-09-14T09:46:09.531Z
Learnt from: aminvakil
Repo: fluent/fluent-bit PR: 10844
File: conf/fluent-bit:13-15
Timestamp: 2025-09-14T09:46:09.531Z
Learning: For fluent-bit Debian packaging, /opt/fluent-bit/bin/ is the appropriate installation path since the package may be installed from non-official Debian sources, making /opt compliant with FHS for optional software packages.
Applied to files:
dockerfiles/Dockerfile
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (4)
include/fluent-bit/aws/flb_aws_msk_iam.h (1)
src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(628-761)
.github/scripts/tests/test_commit_lint.py (1)
.github/scripts/commit_prefix_check.py (2)
infer_prefix_from_paths(30-85)validate_commit(123-213)
src/aws/flb_aws_msk_iam.c (1)
src/aws/flb_aws_util.c (1)
flb_aws_client_generator(280-283)
plugins/in_kafka/in_kafka.c (2)
src/flb_input.c (2)
flb_input_get_property(776-780)flb_input_set_property(557-774)src/aws/flb_aws_msk_iam.c (2)
flb_aws_msk_iam_register_oauth_cb(628-761)flb_aws_msk_iam_destroy(764-786)
🪛 Cppcheck (2.18.0)
plugins/out_kafka/kafka_config.c
[information] 61-61: Include file
(missingIncludeSystem)
[information] 62-62: Include file
(missingIncludeSystem)
[information] 63-63: Include file
(missingIncludeSystem)
[information] 65-65: Include file
(missingIncludeSystem)
[information] 66-66: Include file
(missingIncludeSystem)
[information] 67-67: Include file
(missingIncludeSystem)
[information] 68-68: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_node_exporter_metrics/ne_utils.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_forward/fw_prot.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_elasticsearch/in_elasticsearch.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_exec_wasi/in_exec_wasi.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_msk_iam.c
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 49-49: Include file
(missingIncludeSystem)
[information] 50-50: Include file
(missingIncludeSystem)
[information] 51-51: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] 41-41: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 137-137: Include file
(missingIncludeSystem)
[information] 138-138: Include file
(missingIncludeSystem)
[information] 139-139: Include file
(missingIncludeSystem)
[information] 140-140: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_forward/fw.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_kafka/in_kafka.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_ec2.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_sts.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/flb_kafka.c
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
🔇 Additional comments (25)
plugins/in_elasticsearch/in_elasticsearch.c (1)
240-240: Unrelated change: This file is outside the PR's stated scope.This documentation improvement is correct and helpful, but the Elasticsearch input plugin appears unrelated to the PR's stated objective of implementing AWS MSK IAM authentication for Kafka plugins.
Was this change intentionally included, or should it be moved to a separate PR for documentation improvements?
.github/workflows/commit-lint.yaml (1)
16-16:⚠️ Breaking changes in actions/setup-python@v6 require verification before merging.The
actions/setup-python@v6upgrade introduces a breaking change: Node.js runtime upgrade from v20 to v24, which requires GitHub Actions runner v2.327.1 or later. Theactions/checkout@v6also requires runner v2.329.0+ for authenticated git operations.Before deploying:
- Verify your GitHub Actions runners meet the minimum version requirement (v2.329.0+)
- Test this workflow on a branch with the appropriate runner version
- Review and test any caching steps—there are known compatibility issues between actions/setup-python@v6 and some cache integrations; migrate to
actions/cache@v4+if neededThe commit prefix validation failures in PR comments are unrelated to these version upgrades; they reflect commit message format issues in the actual commits.
plugins/in_forward/fw.c (1)
484-519: Changes appear unrelated to PR scope but are technically correct.The config map description improvements and the
FLB_CONFIG_MAP_MULTflag forsecurity.usersare technically sound. However, this forward input plugin has no connection to Kafka or AWS MSK IAM authentication, which is the stated purpose of this PR. Consider whether these changes should be in a separate PR focused on forward plugin improvements.plugins/in_forward/fw_prot.c (1)
1148-1153: Trace decode error handling and ownership transfer look correctSwitching the check to
ret != CTR_DECODE_MSGPACK_SUCCESSbrings the traces path in line with the metrics path and correctly treats any non‑success as a hard failure. Removingctr_decode_msgpack_destroy(ctr)on the success path (while still destroyingctron append failure) avoids double‑free and matches the documented ownership transfer toflb_input_trace_append().One thing to confirm: that
ctr_decode_msgpack_create()only initializes/allocatesctronCTR_DECODE_MSGPACK_SUCCESS, so returning immediately on non‑success cannot leak anything. If that contract ever changes, we’d need to add a destroy in the error path as well.Also applies to: 1158-1163
src/aws/flb_aws_credentials_ec2.c (1)
128-139: LGTM!Minor formatting change (empty line addition) with no functional impact.
src/aws/flb_aws_credentials_sts.c (2)
173-186: LGTM!Minor formatting change (empty line addition) with no functional impact on the STS provider refresh logic.
478-489: LGTM!Minor formatting change (empty line addition) consistent with other provider implementations.
CMakeLists.txt (1)
86-93: Good security hardening addition.Enabling
--noexecstackby default on Linux prevents executable stack warnings and strengthens security posture. The guard correctly excludes macOS where these linker flags aren't supported.Note: Similar flags exist in
FLB_WASM_STACK_PROTECT(line 101) andFLB_SECURITY(line 502) sections, but having them unconditionally applied ensures baseline protection regardless of those optional settings..github/scripts/commit_prefix_check.py (2)
30-85: Well-structured refactor of prefix inference.The separation of
component_prefixesandbuild_seentracking provides clear semantics:
- Component prefixes track actual code changes (lib, tests, plugins, src)
build_optionalcorrectly indicates whenbuild:prefix can be omittedThe return type change from a single set to a tuple is a breaking change, but the corresponding test updates handle this appropriately.
64-73: Verify src/ subdirectory prefix inference behavior.For
src/files not starting withflb_, the subdirectory name becomes the prefix (e.g.,src/aws/file.c→aws:). This may produce unexpected prefixes for some subdirectories.Consider whether all
src/subdirectories should generate prefixes this way, or if certain directories (likeaws) should have special handling..github/scripts/tests/test_commit_lint.py (3)
37-39: LGTM!Test correctly updated to unpack the new tuple return value and assert both
prefixesandbuild_optional.
250-262: Test properly validates multi-component rejection.The updated test and assertion correctly verify that commits touching multiple non-build components are rejected, with the error message checking for "does not match files changed".
290-313: Good edge case coverage for build_optional semantics.The tests correctly verify:
- Empty file list →
build_optional = False- Unknown paths →
build_optional = False- Real component paths →
build_optional = TrueThis ensures the
build_optionalflag behaves as documented.src/flb_kafka.c (1)
96-99: Correct cleanup for rd_kafka_conf_t in error pathSwitching from raw
flb_free(kafka_cfg)tord_kafka_conf_destroy(kafka_cfg)correctly matches the object’s allocation API and avoids undefined behavior.plugins/in_kafka/in_kafka.h (1)
57-60: Explicit MSK IAM opt‑in flag looks correctAdding
aws_msk_iamas an explicit flag on the input config cleanly supports opt‑in semantics and matches the gating logic inin_kafka.c.include/fluent-bit/aws/flb_aws_msk_iam.h (1)
37-49: Header/API now matches broker‑driven MSK IAM designThe updated
flb_aws_msk_iam_register_oauth_cbsignature and parameter docs (config, kconf, opaque, brokers) match the implementation and correctly expose the broker‑based activation flow.plugins/out_kafka/kafka_config.h (1)
128-131: Output config mirrors input’s explicit MSK IAM flagIntroducing
aws_msk_iamalongsidemsk_iamon the output side keeps semantics consistent with the input plugin and enables explicit IAM opt‑in.dockerfiles/Dockerfile (1)
22-60: Debian 13/distroless 13 migration and jemalloc options wiring look coherentThe move to
debian:trixie-slim/cc-debian13, updatedlib* t64package set, and the optionalFLB_JEMALLOC_OPTIONSplumbed through to CMake form a consistent story for newer bases and allocator tuning. Please just confirm all listed packages (especiallylibssl3t64,libcurl4t64,libgnutls30t64,libldap2) exist on all target architectures for your build matrix.Also applies to: 74-77, 95-99, 121-178, 185-215, 216-269
plugins/in_kafka/in_kafka.c (2)
271-297: SASL/OAUTHBEARER handling and background refresh are well integratedReading
rdkafka.sasl.mechanismup front, translatingaws_msk_iamtoOAUTHBEARERwhile recording explicit opt‑in, and enabling both the SASL queue and background callbacks for all OAUTHBEARER cases is a solid way to fix idle‑connection expiry without impacting non‑IAM OAuth users.Also applies to: 347-356, 404-424
486-507: Init‑error path now correctly cleans up MSK IAM contextAdding
flb_aws_msk_iam_destroy(ctx->msk_iam)to the init error path aligns the failure cleanup within_kafka_exit()and prevents leaking the MSK IAM TLS/provider resources when initialization fails mid‑way.plugins/out_kafka/kafka_config.c (2)
61-87: Producer SASL/MSK IAM handling mirrors the input side correctlyThe logic that detects
rdkafka.sasl.mechanism=aws_msk_iam, records explicit opt‑in, rewrites the mechanism toOAUTHBEARER, and defaults the security protocol toSASL_SSLis consistent with the input plugin and with librdkafka expectations.
252-267: rd_kafka_conf ownership now handled correctly on producer creation success/failureConditionally nulling
ctx->confonly after a successfulrd_kafka_new()and destroying either the producer handle or the conf (but not both) inflb_out_kafka_destroy()aligns with librdkafka’s ownership semantics and removes the earlier leak on creation failure.Also applies to: 338-351
src/aws/flb_aws_msk_iam.c (3)
137-212: Broker‑driven region detection and MSK IAM context setup look solidThe new
extract_region_from_broker()helper and the reworkedflb_aws_msk_iam_register_oauth_cb()correctly switch MSK IAM activation to a broker‑driven model, avoid opaque type confusion, and initialize TLS + the AWS credential provider with appropriate cleanup on every failure path. Region parsing for standard, serverless, and VPC endpoint hostnames looks reasonable and well‑guarded by validation and logging.Also applies to: 627-761
214-512: MSK IAM payload construction and cleanup look correctThe refactored
build_msk_iam_payload()validates inputs, derives the SigV4 presigned URL, produces a URL‑safe Base64 payload, and has a thorough single‑exit error path that frees all intermediate SDS allocations. The memory‑management and hashing/HMAC steps look consistent and leak‑free; any further tweaks (e.g., preferringflb_sds_lenoverstrlenon SDS values) are purely stylistic.
514-625: OAuth refresh callback now safely synchronizes provider access and token lifetimeWrapping the credential provider
refresh/get_credentialscalls with a mutex and using a fixed 5‑minute token lifetime (backed by librdkafka’s background refresh at ~80%) resolves the earlier concurrency and idle‑expiry issues. The destroy function correspondingly cleans up provider, TLS, region, and the mutex, so the MSK IAM context’s lifecycle is well‑defined.Also applies to: 763-785
| int ret; | ||
| FILE *f; | ||
| char line[512]; | ||
| char line[2048]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verify that this change belongs in this PR.
This buffer size increase from 512 to 2048 bytes appears unrelated to the PR's stated objective of implementing AWS MSK IAM authentication for Kafka. The node_exporter_metrics plugin has no connection to the Kafka or AWS MSK features described in the PR summary.
Was this file included accidentally, or is there a specific reason for bundling this change with the MSK IAM feature?
🧰 Tools
🪛 Cppcheck (2.18.0)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
plugins/out_kafka/kafka_config.c (1)
61-87: SASL/MSK IAM handling, background refresh, and conf ownership are wired correctly
- Using
rdkafka.sasl.mechanism=aws_msk_iamas the only activation path, then rewriting it toOAUTHBEARERand defaultingrdkafka.security.protocoltoSASL_SSLwhen unset, keeps MSK IAM explicit and avoids surprising generic OAUTHBEARER users.- Gating MSK IAM registration on
ctx->aws_msk_iamand AWS‑shaped broker hostnames prevents accidental IAM enablement for non‑MSK clusters while still covering Standard, Serverless, and VPC endpoint patterns.- Creating a shared opaque and setting it once on the conf, then letting MSK IAM add its context via the same opaque, cleanly reuses the same callback payload for both logging and token refresh.
- Enabling the SASL queue on OAUTHBEARER confs and then enabling SASL background callbacks on the producer handle is exactly what’s needed to keep tokens fresh on idle connections without relying on
rd_kafka_poll()from the flush path.- The updated
rd_kafka_new/destroy pairing (only nullingctx->confon success and explicitly destroyingctx->confwhenrkwas never created) matches librdkafka’s ownership semantics and closes the previous leak on producer‑creation failure while avoiding double‑frees in the normal destroy path.Please double‑check that the minimum librdkafka version you support in Fluent Bit provides
rd_kafka_conf_enable_sasl_queue()andrd_kafka_sasl_background_callbacks_enable(), and that your existing CI covers a failure ofrd_kafka_new()to ensure the conf‑cleanup path behaves as expected.Also applies to: 197-208, 209-219, 220-249, 252-267, 338-351
plugins/in_kafka/in_kafka.c (1)
271-297: Consumer MSK IAM, SASL background refresh, and cleanup paths are aligned and robust
- Mapping
rdkafka.sasl.mechanism=aws_msk_iamtoOAUTHBEARERwhile settingctx->aws_msk_iamand defaultingrdkafka.security.protocoltoSASL_SSLonly when not already set keeps IAM opt‑in and avoids interfering with other SASL modes.- Enabling the SASL queue on the consumer conf and then enabling SASL background callbacks on the created handle gives the consumer the same idle‑connection safety as the producer.
- The MSK IAM registration is now correctly gated on
ctx->aws_msk_iam, an OAUTHBEARER mechanism, and AWS MSK‑shaped brokers, and passes the brokers string explicitly toflb_aws_msk_iam_register_oauth_cb, avoiding previous opaque/type‑confusion issues.- The
rd_kafka_new/kafka_confownership logic now correctly keepskafka_confalive on failure soinit_errorcan destroy it, and nullskafka_confonly on success, matching librdkafka’s rules.init_errorandin_kafka_exitboth now destroyctx->msk_iamwhen present, so MSK IAM TLS/provider resources are released on both init failures and normal shutdown.Please confirm that your minimum librdkafka version supports
rd_kafka_conf_enable_sasl_queue()andrd_kafka_sasl_background_callbacks_enable()for consumers as used here, and that your tests cover both successful and failed consumer initialization with MSK IAM enabled to exercise the new cleanup paths.Also applies to: 339-356, 358-389, 391-403, 412-424, 486-507, 549-552
🧹 Nitpick comments (1)
.github/scripts/commit_prefix_check.py (1)
30-86: Path‑based prefix inference and build_optional flag look soundThe new
(prefixes, build_optional)return shape and the split betweencomponent_prefixesandbuild_seengive you enough information to (a) allowbuild:for pure build‑system commits and (b) keep it optional when any lib/tests/plugins/src files are touched. Normalizing paths and deriving per‑area prefixes here should keep validate logic straightforward.If you want to avoid ever suggesting
CMakeLists.txt:to authors, you could special‑casesrc/CMakeLists.txtsimilarly to the build path (map it tobuild:instead ofCMakeLists.txt:), but that’s purely cosmetic.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (21)
.github/scripts/commit_prefix_check.py(4 hunks).github/scripts/tests/test_commit_lint.py(4 hunks).github/workflows/commit-lint.yaml(2 hunks)CMakeLists.txt(1 hunks)dockerfiles/Dockerfile(11 hunks)include/fluent-bit/aws/flb_aws_msk_iam.h(1 hunks)plugins/in_elasticsearch/in_elasticsearch.c(1 hunks)plugins/in_exec_wasi/in_exec_wasi.c(1 hunks)plugins/in_forward/fw.c(3 hunks)plugins/in_forward/fw_prot.c(2 hunks)plugins/in_kafka/in_kafka.c(4 hunks)plugins/in_kafka/in_kafka.h(1 hunks)plugins/in_node_exporter_metrics/ne_utils.c(1 hunks)plugins/out_kafka/kafka.c(0 hunks)plugins/out_kafka/kafka_config.c(4 hunks)plugins/out_kafka/kafka_config.h(1 hunks)src/aws/flb_aws_credentials_ec2.c(1 hunks)src/aws/flb_aws_credentials_profile.c(1 hunks)src/aws/flb_aws_credentials_sts.c(2 hunks)src/aws/flb_aws_msk_iam.c(12 hunks)src/flb_kafka.c(1 hunks)
💤 Files with no reviewable changes (1)
- plugins/out_kafka/kafka.c
🚧 Files skipped from review as they are similar to previous changes (2)
- include/fluent-bit/aws/flb_aws_msk_iam.h
- plugins/in_kafka/in_kafka.h
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
CMakeLists.txtdockerfiles/Dockerfilesrc/aws/flb_aws_msk_iam.c
📚 Learning: 2025-09-08T11:21:33.975Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 10851
File: include/fluent-bit/flb_simd.h:60-66
Timestamp: 2025-09-08T11:21:33.975Z
Learning: Fluent Bit currently only supports MSVC compiler on Windows, so additional compiler compatibility guards may be unnecessary for Windows-specific code paths.
Applied to files:
CMakeLists.txt
📚 Learning: 2025-09-04T12:32:46.030Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10825
File: plugins/out_s3/s3.c:0-0
Timestamp: 2025-09-04T12:32:46.030Z
Learning: In Fluent Bit plugins, avoid creating plugin-local config_map entries that duplicate core functionality. The core already provides Retry_Limit parsed into ins->retry_limit, so plugins should use that directly rather than defining their own retry_limit configuration option.
Applied to files:
plugins/in_exec_wasi/in_exec_wasi.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
dockerfiles/Dockerfilesrc/aws/flb_aws_msk_iam.csrc/flb_kafka.c
📚 Learning: 2025-09-14T09:46:09.531Z
Learnt from: aminvakil
Repo: fluent/fluent-bit PR: 10844
File: conf/fluent-bit:13-15
Timestamp: 2025-09-14T09:46:09.531Z
Learning: For fluent-bit Debian packaging, /opt/fluent-bit/bin/ is the appropriate installation path since the package may be installed from non-official Debian sources, making /opt compliant with FHS for optional software packages.
Applied to files:
dockerfiles/Dockerfile
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (4)
plugins/in_kafka/in_kafka.c (4)
src/flb_input.c (1)
flb_input_get_property(776-780)src/flb_sds.c (2)
flb_sds_create(78-90)flb_sds_destroy(389-399)src/aws/flb_aws_msk_iam.c (2)
flb_aws_msk_iam_register_oauth_cb(628-761)flb_aws_msk_iam_destroy(764-786)src/flb_kafka.c (1)
flb_kafka_opaque_destroy(233-240)
plugins/out_kafka/kafka_config.c (3)
src/flb_output.c (2)
flb_output_get_property(1108-1111)flb_output_set_property(843-1068)src/flb_sds.c (2)
flb_sds_create(78-90)flb_sds_destroy(389-399)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(628-761)
.github/scripts/tests/test_commit_lint.py (1)
.github/scripts/commit_prefix_check.py (2)
infer_prefix_from_paths(30-85)validate_commit(123-213)
src/aws/flb_aws_msk_iam.c (4)
src/aws/flb_aws_credentials.c (1)
flb_aws_credentials_destroy(752-767)src/tls/flb_tls.c (2)
flb_tls_create(183-232)flb_tls_destroy(258-277)src/aws/flb_aws_util.c (1)
flb_aws_client_generator(280-283)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)
🪛 Cppcheck (2.18.0)
plugins/in_forward/fw_prot.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_profile.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_elasticsearch/in_elasticsearch.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_exec_wasi/in_exec_wasi.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_kafka/in_kafka.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/out_kafka/kafka_config.c
[information] 61-61: Include file
(missingIncludeSystem)
[information] 62-62: Include file
(missingIncludeSystem)
[information] 63-63: Include file
(missingIncludeSystem)
[information] 65-65: Include file
(missingIncludeSystem)
[information] 66-66: Include file
(missingIncludeSystem)
[information] 67-67: Include file
(missingIncludeSystem)
[information] 68-68: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_node_exporter_metrics/ne_utils.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_msk_iam.c
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] 31-31: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 49-49: Include file
(missingIncludeSystem)
[information] 50-50: Include file
(missingIncludeSystem)
[information] 51-51: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] 41-41: Include file
(missingIncludeSystem)
[information] 40-40: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] 52-52: Include file
(missingIncludeSystem)
[information] 53-53: Include file
(missingIncludeSystem)
[information] 54-54: Include file
(missingIncludeSystem)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] 137-137: Include file
(missingIncludeSystem)
[information] 138-138: Include file
(missingIncludeSystem)
[information] 139-139: Include file
(missingIncludeSystem)
[information] 140-140: Include file
(missingIncludeSystem)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_ec2.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
plugins/in_forward/fw.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/aws/flb_aws_credentials_sts.c
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
src/flb_kafka.c
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
🔇 Additional comments (29)
plugins/in_elasticsearch/in_elasticsearch.c (1)
237-241: Config help text update is clear and consistentThe added description for the
http2option is accurate and matches surrounding config_map style. No further changes needed.src/aws/flb_aws_credentials_ec2.c (1)
132-134: LGTM!This is a minor formatting change adding a blank line for readability. No functional impact.
src/aws/flb_aws_credentials_sts.c (1)
173-186: Whitespace-only change; no functional impactThe added blank lines in
refresh_fn_stsandrefresh_fn_eksare purely cosmetic and do not affect behavior, error handling, or locking. Change looks fine as-is.Also applies to: 478-489
plugins/in_forward/fw_prot.c (2)
1149-1151: Unrelated changes bundled in this PR, but the improvements look good.This file (
plugins/in_forward/fw_prot.c) is unrelated to the PR's stated objectives of adding AWS MSK IAM authentication for the Kafka plugin. However, the changes themselves are improvements:
- Using
CTR_DECODE_MSGPACK_SUCCESSconstant instead of magic number-1improves code clarity.- Using
flb_plg_errorprovides proper plugin-scoped logging.Consider separating unrelated changes into a dedicated PR to maintain clear change history and simplify review.
1162-1162: Good documentation of ownership semantics.The ownership comment clarifies that
flb_input_trace_appendtakes ownership of thectrobject and destroys it on success, preventing a double-free bug. This is a valuable addition for maintainability.Note that this differs from the METRICS path (lines 1141-1144), where
cmt_decode_msgpack_destroy(cmt)is called explicitly afterflb_input_metrics_append. This asymmetry appears intentional based on different ownership semantics between the two APIs.As noted previously, these changes are unrelated to the PR's AWS MSK IAM objectives.
dockerfiles/Dockerfile (5)
74-77: Jemalloc configuration is properly wired.The ARG → ENV → CMake propagation pattern is correct. The optional nature of FLB_JEMALLOC_OPTIONS is clearly documented in the comment. No issues identified.
Also applies to: 95-95
216-216: Debug stage updates align with trixie migration.The debug stage properly includes t64-suffixed runtime libraries and appropriate development tools. The addition of LDAP support (libldap2) is consistent with other stages.
Also applies to: 237-268
185-185: Distroless image upgrade to cc-debian13 is confirmed and appropriate.The gcr.io/distroless/cc-debian13 image is available and actively maintained by Google Container Tools with support across multiple architectures and variant tags (nonroot, debug). This aligns properly with the Debian trixie baseline.
22-22: Verify Debian trixie-slim compatibility before merging.The upgrade from bookworm-slim to trixie-slim involves significant OS-level changes documented by Debian: glibc ABI bumps, package removals, and potential service configuration incompatibilities. Ensure:
- All build dependencies (build-essential, libssl-dev, libcurl4-openssl-dev, libsystemd-dev, etc.) resolve correctly in trixie
- The CMake toolchain is compatible with trixie's compiler versions
- The distroless cc-debian13 image is available and properly tested
- All package name updates with t64 suffixes are complete across builder, deb-extractor, and debug stages
129-170: Both libnghttp2-14 and libnghttp3-9 are intentionally included and do not require changes.The deb-extractor stage correctly includes both libraries: libnghttp2 provides HTTP/2 support while libnghttp3 provides HTTP/3 support. This dual-library pattern is a standard practice in HTTP clients (such as curl) that need to support multiple protocol versions with automatic fallback. This is not an inconsistency.
The t64-suffixed packages (libssl3t64, libcurl4t64, etc.) are correctly named for the Debian trixie 64-bit time_t ABI transition and are the appropriate versions for trixie-slim base images. The LDAP packages (libldap-2.5 in the builder, libldap2 in deb-extractor) are properly included for MSK IAM SASL authentication support in the distroless multi-stage build pattern, where runtime shared libraries must be extracted and copied into the runtime image.
Likely an incorrect or invalid review comment.
plugins/in_node_exporter_metrics/ne_utils.c (1)
156-157: LGTM!The buffer size increase from 512 to 2048 bytes is reasonable for handling longer lines in proc/sys filesystem files, and aligns with the existing
real_pathbuffer size for consistency.src/aws/flb_aws_credentials_profile.c (1)
664-673: LGTM!Downgrading the log level to
AWS_CREDS_DEBUGfor a missing credentials file (ENOENT) is appropriate. This is an expected condition in environments using alternative credential sources (IAM roles, environment variables, ECS/EC2 metadata). The function still correctly returns-1to signal the caller to try other credential providers.CMakeLists.txt (1)
86-93: LGTM! Good security hardening.Adding
-Wa,--noexecstackand-Wl,-z,noexecstackflags unconditionally for Linux builds is a solid security improvement. This prevents executable stack vulnerabilities and silences kernel warnings. The platform check correctly excludes macOS where these flags aren't supported.plugins/in_forward/fw.c (1)
484-519: LGTM!The configuration description updates improve clarity by providing better context about secure forward authentication. These documentation enhancements will help users understand the purpose of each option.
src/flb_kafka.c (1)
96-100: Good fix for proper resource cleanup.Using
rd_kafka_conf_destroy()instead offlb_free()is correct. Therd_kafka_conf_tobject is allocated by librdkafka viard_kafka_conf_new()and contains internal structures that must be freed using the library's destructor to avoid memory leaks and corruption..github/scripts/tests/test_commit_lint.py (5)
37-39: LGTM!Tests correctly updated to unpack the new tuple return value
(prefixes, build_optional)frominfer_prefix_from_paths(). The assertion forbuild_optional is Trueis correct since plugin files represent real components.
61-67: LGTM!Test correctly verifies that multiple files from different components produce multiple prefixes, and that
build_optionalisTruewhen real components are touched.
250-261: LGTM!Test and assertion correctly updated. The error message check changed from "Expected one of:" to "does not match files changed" which aligns with the updated
validate_commitlogic that rejects multi-component commits requiring split.
294-297: LGTM!Correctly verifies that an empty file list returns an empty prefix set with
build_optionalasFalse, since no components or CMakeLists.txt were touched.
607-622: LGTM!Test docstring and logic correctly updated to reflect that multi-component commits must be split, even when the subject matches one component. The assertions properly verify both expected prefixes are reported.
.github/workflows/commit-lint.yaml (2)
26-26: No action needed.actions/setup-python@v6is a valid version (currently at v6.1.0) and will not cause CI failures.
16-16: No action needed —actions/checkout@v6is a valid, stable release.As of December 2025,
actions/[email protected]has been released and is stable. Similarly,actions/setup-python@v6(v6.1.0 as of late November 2025) is also available. These version bumps are legitimate and will not cause workflow failures.Likely an incorrect or invalid review comment.
plugins/out_kafka/kafka_config.h (1)
128-131: AWS MSK IAM fields are well-scoped and clearKeeping both
msk_iamand theaws_msk_iamflag under#ifdef FLB_HAVE_AWS_MSK_IAMcleanly scopes AWS‑specific state and the comment makes the explicit‑opt‑in semantics obvious. Looks good and consistent with the input plugin struct..github/scripts/commit_prefix_check.py (1)
160-213: Multi‑component and build: prefix rules are consistent with inferred prefixesThe new logic that:
- filters out
build:/cmakelists.txt:fromnon_build_prefixes,- requires an umbrella
lib:prefix when multiple non‑build areas are touched, and- enforces
build:only whenbuild_optionalis falsegives clear, deterministic behavior for mixed commits while still allowing flexible subjects when only docs/tools are touched (
expectedempty). The error messaging listingExpected one of: ...will also be helpful when commits span several areas.src/aws/flb_aws_msk_iam.c (5)
137-212: Region extraction from broker covers Standard, Serverless, and VPC endpoint formatsThe
extract_region_from_broker()logic that:
- strips the optional
:port,- locates
.amazonaws.com(or.vpce.amazonaws.comvia the.vpcecheck), and- walks back to the preceding dot to slice out the region
correctly handles the documented MSK Standard, Serverless, and VPC endpoint hostnames while rejecting malformed or path‑like inputs. The explicit length sanity check on the region string is a nice guardrail.
215-512: MSK IAM payload construction and cleanup look correct; consider double‑checking expiry choicesThe refactored
build_msk_iam_payload():
- Validates
config->region,host, and credentials up front.- Builds the credential scope, canonical query (including optional session token), canonical request, and signing key following AWS SigV4 conventions.
- Constructs the full presigned URL, base64‑encodes it, then normalizes it to URL‑safe Base64 and strips padding.
- Has a comprehensive
error:path that frees every intermediateflb_sds_tand avoids double frees by nullingempty_payload_hexafter the happy‑path destroy.The only behavioral point worth a second look is that the presigned URL uses
X-Amz-Expires=900(15 minutes) while the OAuth token lifetime is fixed at 300 seconds viaMSK_IAM_TOKEN_LIFETIME_SECONDS. Running the OAuth lifetime shorter than the presign expiry is likely safe (and conservative), but it would be good to confirm this matches the latest AWS MSK IAM recommendations.If you haven’t already, please verify against the current MSK IAM docs that a 5‑minute OAuth token lifetime combined with a 900‑second
X-Amz-Expiresis the intended combination (or adjust one of them if AWS expects them to match).
514-625: OAuth refresh callback now safely serializes provider access and cleans up credentialsThe updated
oauthbearer_token_refresh_cb()looks solid:
- It validates the opaque context and region and derives the MSK endpoint host based on
is_serverless.- It uses
pthread_mutex_lock/unlockaroundprovider->refresh()andget_credentials()so librdkafka’s background thread doesn’t race with any other potential provider users.- It bails early with
rd_kafka_oauthbearer_set_token_failure()and clear log messages when refresh or credentials retrieval fails.- It hands credentials to
build_msk_iam_payload(), sets a token lifetime based onMSK_IAM_TOKEN_LIFETIME_SECONDSas an absolute epoch time, and always destroys the credentials and payload afterward.This is a good concurrency and lifecycle story for the MSK IAM provider.
Since this callback is driven by librdkafka internals, please ensure your shutdown paths (
rd_kafka_destroyfor producer/consumer followed byflb_aws_msk_iam_destroy) are exercised in tests so there’s no window where the provider is freed while a background refresh callback might still be in flight.
627-761: MSK IAM registration lifecycle and opaque usage are clean; just confirm flb_kafka_opaque_set semanticsThe registration function now:
- Validates
opaqueandbrokersexplicitly.- Parses the first broker, extracts the region via
extract_region_from_broker(), and detects Serverless vs Standard via.kafka-serverless..- Allocates
ctx, setsctx->region, creates a TLS instance, and wires a standard AWS provider (sync/init/async), aborting with full cleanup on each failure path.- Initializes a mutex guarding provider access.
- Calls
flb_kafka_opaque_set(opaque, NULL, ctx)and then registers both the opaque and the OAuth refresh callback on the conf.- Provides a matching
flb_aws_msk_iam_destroy()that tears down provider, TLS, region, mutex, and the struct itself.The only thing to double‑check is how
flb_kafka_opaque_set()behaves when called twice: Kafka plugins first callflb_kafka_opaque_set(ctx->opaque, ctx, NULL)to store the plugin context, and here you call it again with(opaque, NULL, ctx)to add the MSK IAM context. As long asflb_kafka_opaque_set()only overwrites the non‑NULL argument and leaves the existing plugin pointer intact, this is perfect; if it instead blindly overwrites both fields, you’d lose the plugin context for log/message callbacks.It’s worth quickly inspecting
flb_kafka_opaque_set()(insrc/flb_kafka.c) to confirm it treats NULL arguments as “don’t change this field”; if it does not, updating it to do so will prevent clobberingopaque->ptrwhen MSK IAM is registered.
763-785: MSK IAM destroy path matches initialization and avoids leaks
flb_aws_msk_iam_destroy()mirrors the successful registration path by:
- Destroying the AWS provider if present,
- Destroying the TLS context if present,
- Freeing the region SDS,
- Destroying the mutex, and
- Finally freeing the struct.
Given that the registration function only returns a fully initialized
ctx, this cleanup order is appropriate and should prevent leaks of TLS/provider resources.
| FLB_CONFIG_MAP_BOOL, "oneshot", "false", | ||
| 0, FLB_TRUE, offsetof(struct flb_exec_wasi, oneshot), | ||
| "execute the command only once" | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is unrelated to the PR scope and should be in a separate PR.
The PR objectives state this PR adds "AWS MSK IAM authentication support" to the Kafka plugins (out_kafka/in_kafka), but this file (in_exec_wasi.c) is completely unrelated to Kafka or AWS functionality.
While the config key rename from "bool" to "oneshot" is a valid fix, it belongs in a separate PR. Per the PR comments, the maintainer (cosmo0920) specifically flagged concerns about commit prefixes not matching changed files, and mixing unrelated changes violates that principle.
Recommendation: Remove this change from the current PR and submit it separately with an appropriate commit message like in_exec_wasi: fix config key name from 'bool' to 'oneshot'.
🧰 Tools
🪛 Cppcheck (2.18.0)
[information] Limiting analysis of branches. Use --check-level=exhaustive to analyze all branches.
(normalCheckLevelMaxBranches)
[information] Too many #ifdef configurations - cppcheck only checks 12 configurations. Use --force to check all configurations. For more details, use --enable=information.
(toomanyconfigs)
🤖 Prompt for AI Agents
plugins/in_exec_wasi/in_exec_wasi.c lines 456-459: this change (renaming or
adding the "oneshot" config key) is unrelated to the AWS MSK IAM/Kafka feature
and must be removed from this PR; revert the edit in this file so the current PR
only contains Kafka/AWS-related changes, then create a separate branch/commit
with the isolated fix and open a new PR titled like "in_exec_wasi: fix config
key name from 'bool' to 'oneshot'".
|
Unfortunately, your commit still do not follow our linting rule: |
Summary
Add comprehensive AWS MSK IAM authentication support with simplified configuration and fix OAuth token expiration on idle connections. This PR automatically extracts region and cluster type information from broker addresses, provides explicit opt-in for MSK IAM, enhances OAUTHBEARER token refresh for all OAuth methods, and enables automatic background token refresh to prevent authentication failures on idle connections.
Changes
Key Features
Explicit MSK IAM Opt-in
rdkafka.sasl.mechanism=aws_msk_iamaws_msk_iamflag to track user intentSimplified Configuration
cluster_arnparameterrdkafka.sasl.mechanism=aws_msk_iamOAUTHBEARERinternally and registers OAuth callbackAutomatic Region Extraction
Automatic Cluster Type Detection
Universal OAUTHBEARER Enhancements
OAuth Token Lifetime Management
rd_kafka_poll()TLS Support for AWS Credentials
Technical Details
Explicit MSK IAM Activation:
Configuration Simplification:
rdkafka.sasl.mechanism=aws_msk_iamOAUTHBEARERand registers OAuth callbackrdkafka.security.protocol=SASL_SSL(if not configured)Region Extraction Logic:
b-1.example.kafka.us-east-1.amazonaws.com)*.kafka.<region>.amazonaws.com*.kafka-serverless.<region>.amazonaws.comCluster Type Detection:
.kafka-serverless.to determine cluster typekafkaorkafka-serverless)Universal OAUTHBEARER Background Processing:
Modified Files
include/fluent-bit/aws/flb_aws_msk_iam.h- Updated function signature (removed cluster_arn parameter)src/aws/flb_aws_msk_iam.c- Refactored region extraction and cluster type detection logicplugins/in_kafka/in_kafka.h- Addedaws_msk_iamflag, removed deprecated fieldsplugins/in_kafka/in_kafka.c- Added explicit MSK IAM activation, universal OAUTHBEARER supportplugins/out_kafka/kafka_config.h- Addedaws_msk_iamflag, removed deprecated fieldsplugins/out_kafka/kafka_config.c- Added explicit MSK IAM activation, universal OAUTHBEARER supportplugins/out_kafka/kafka.c- Removed deprecated configuration mappingConfiguration
Simple AWS MSK IAM Setup:
[INPUT] Name kafka Brokers b-1.example.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism aws_msk_iamNo
cluster_arnor additional AWS-specific parameters needed!Supported Configurations
This PR ensures compatibility with multiple OAuth scenarios:
1. AWS MSK IAM (Fluent Bit convenience syntax)
[INPUT] Name kafka Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism aws_msk_iam2. librdkafka OIDC (unaffected by MSK IAM)
[INPUT] Name kafka Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism OAUTHBEARER rdkafka.sasl.oauthbearer.method oidc rdkafka.sasl.oauthbearer.client.id my_client_id rdkafka.sasl.oauthbearer.client.secret my_secret rdkafka.sasl.oauthbearer.token.endpoint.url https://auth.example.com/token3. librdkafka AWS method (unaffected by MSK IAM)
[INPUT] Name kafka Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism OAUTHBEARER rdkafka.sasl.oauthbearer.method awsAll configurations benefit from automatic background token refresh!
Design for Extensibility
This PR establishes a clean, extensible pattern for adding cloud provider IAM authentication:
1. Layered Configuration Approach
2. Explicit Opt-in Pattern
3. Benefits of This Design
4. Future Extensions
This architecture makes it straightforward to add:
Each can be added with the same explicit opt-in pattern without affecting existing functionality.
OAuth Token Expiration Fix
Problem Statement:
After prolonged idle periods (5+ minutes), Kafka outputs experienced authentication failures:
Root Cause:
librdkafka's OAuth token refresh mechanism relies on
rd_kafka_poll()being called regularly. For idle connections,rd_kafka_poll()is only called when producing messages. This is documented in librdkafka issue #3871:Timeline without background callbacks:
Solution: Background Callbacks
librdkafka v1.9.0+ provides
rd_kafka_sasl_background_callbacks_enable()specifically for this use case:Timeline with background callbacks:
Benefits:
rd_kafka_poll()not required)TLS Support
This PR includes proper TLS support for AWS credential fetching:
Features:
Usage:
Testing
Packaging
ok-package-testlabel to test for all targets (requires maintainer to do)Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Chores
✏️ Tip: You can customize this high-level summary in your review settings.